<?php


namespace app\socket\controller;

//use Channel\Client;
use app\common\model\DebugModel;
use app\socket\services\LoginService;
use think\facade\Db;
use think\worker\Server;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;

//报告NOTICE之外的所有错误
error_reporting(E_ALL & ~E_NOTICE);
//ini_set("display_errors", "On");

/**
 * Class Websocket
 * @package app\http
 * TP里面用workerman的用法测试
 */
class Websocket extends Server
{

    protected $HEARTBEAT_TIME = 55;
    // 全局变量，保存当前进程的客户端连接数
    protected $connection_count = 0;
    protected $socket = 'websocket://0.0.0.0:9092';
    //protected $host = '0.0.0.0';
    protected $option = ['count' => 4, 'name' => 'newWebsocket'];
    protected $mysql_host = '127.0.0.1';
    protected $mysql_port = '3306';
    protected $mysql_user = 'root';
    protected $mysql_pwd = 'root';
    protected $mysql_db_name = 'workerman';

    protected function init()
    {
        // 初始化一个Channel服务端
        //$channel_server = new \Channel\Server('0.0.0.0', 2206);
        //$channel_server->name = 'channel_server';
        parent::init(); // TODO: Change the autogenerated stub
    }

    /**
     * @param $worker
     * 初始化workerman
     */
    public function onWorkerStart($worker)
    {
        echo "================onWorkerStart=============";
        //保存每个用户的connection
        $worker->connection_uids = array();
//        global $db;
//        $db = 100;
        // 新增加一个属性，用来保存uid到connection的映射(uid是用户id或者客户端唯一标识)
        //参考文档:http://doc.workerman.net/faq/send-data-to-client.html
        //----------------------------------------初始化mysql链接-------------------------
        // 将db实例存储在全局变量中(也可以存储在某类的静态成员中),参考手册http://doc.workerman.net/components/workerman-mysql.html
        //global $db;
//        global $inner_worker;
        //$db = new \Workerman\MySQL\Connection($this->mysql_host, $this->mysql_port, $this->mysql_user, $this->mysql_pwd, $this->mysql_db_name);
        //self::websocketAsyncTcpConnection($worker);
        //----------------------------------------内部监听其他的端口------------------------
//        $inner_worker = new \Workerman\Worker('tcp://0.0.0.0:2347');
//        /**
//         * 多个进程监听同一个端口（监听套接字不是继承自父进程）
//         * 需要开启端口复用，不然会报Address already in use错误
//         */
//
//        $inner_worker->count = 4;
//        $inner_worker->reusePort = true;
//        //获取转发过来的消息
//        $inner_worker->onMessage = function ($connection, $message) use ($worker) {
////            echo "轮到我发消息了\n";
//            // 给connection临时设置一个lastMessageTime属性，用来记录上次收到消息的时间
//            $connection->lastMessageTime = time();
//            // 解析数据
//            $resData = json_decode($message, true);
//            // 这里处理设备发来的数据 $data
//            // 把tcp设备消息发送数据转发给websocket,前端根据不同消息类型展示不同的数据
//            foreach ($worker->connections as $websocket_con) {
//                //发给websocket绑定的某一个用户
//                $websocket_con->send($message);
//                self::sendMessageByUid(100, 'websock我想你了',$websocket_con);
//            }
//        };
//        $inner_worker->onWorkerStart = function ($inner_worker) {
//            //---------------------定义TCP的心跳-------------------
//            Timer::add(1, function () use ($inner_worker) {
//                $time_now = time();
//                foreach ($inner_worker->connections as $connection) {
//                    // 有可能该connection还没收到过消息，则lastMessageTime设置为当前时间
//                    if (empty($connection->lastMessageTime)) {
//                        $connection->lastMessageTime = $time_now;
//                        continue;
//                    }
//                    // 上次通讯时间间隔大于心跳间隔，则认为客户端已经下线，关闭连接
//                    if ($time_now - $connection->lastMessageTime > ($this->HEARTBEAT_TIME)) {
//                        $connection->close();
//                    } else {
//                        $connection->send('{"type":"ping"}');
//                    }
//                }
//            });
//        };
//        $inner_worker->onConnect = function ($connection) {
//            // 有新的客户端连接时，连接数+1
//            global $connection_count;
//            ++$connection_count;
//            echo "tcp closed\n";
//        };
        // 执行监听
//        $inner_worker->listen();
        //------------------------------------------创建一个定时器-----------------------------
        // 1秒后启动一个udp客户端，连接1234端口并发送字符串 hi
        Timer::add(1, function ()use($worker) {
            $udp_connection = new AsyncTcpConnection('udp://127.0.0.1:1234');
            $udp_connection->onConnect = function ($udp_connection)use($worker) {
                $udp_connection->send('hi');
            };
            $udp_connection->onMessage = function ($udp_connection, $data) {
                // 收到服务端返回的数据 hello
                echo "recv $data\r\n";
                // 关闭连接
                $udp_connection->close();
            };
            $udp_connection->connect();
        }, null, false);
        //-------------------------------------------心跳检测---------------------------------
        Timer::add(5, function () use ($worker) {
            $time_now = time();
            foreach ($worker->connections as $connection) {
                // 有可能该connection还没收到过消息，则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔，则认为客户端已经下线，关闭连接
                if ($time_now - ($connection->lastMessageTime) > ($this->HEARTBEAT_TIME)) {
                    $connection->close();
                } else {
                    $connection->send('{"type":"ping"}');
                }
            }
        });
        echo "Worker starting...\n";
    }

    /**
     * @param $connection
     *  当客户端连接时触发
     *  如果业务不需此回调可以删除onConnect
     *  $connection->client_id 连接id
     */
    public function onConnect($connection)
    {
        // 设置连接的onMessage回调,跟下面的onMessage方法是一样的效果,不过一般建议写成单独的方法体处理
        //$connection->onMessage = function($connection, $data)
        //{
        //var_dump($data);
        //$connection->send('receive success');
        //};
        global $id;
        $id = $connection->id;
        ++$id;
        // 有新的客户端连接时，连接数+1
        global $connection_count;
        ++$connection_count;
        // 给connection对象动态添加一个属性，用来保存当前连接发来多少个请求
        $connection->messageCount = 0;
        echo "new connection from ip " . $connection->getRemoteIp() . ":" . $connection->getRemotePort() . " ID：{$id}\n";
    }

    /**
     * @param $connection
     * @param $data 内容
     *  当客户端发来消息时触发
     */
    public function onMessage($connection, $data)
    {
        global $inner_worker;
//        global $db;
        // 给connection临时设置一个lastMessageTime属性，用来记录上次收到消息的时间
        $connection->lastMessageTime = time();
        $connection->connection_uids = array();
        // 解析数据
        $resData = json_decode($data, true);
        //--------------------每个连接接收100个请求后就不再接收数据-------------------
//        $limit = 100;
//        if (++$connection->messageCount > $limit) {
//            $connection->pauseRecv();
//            // 30秒后恢复接收数据
//            Timer::add(30, function ($connection) {
//                $connection->resumeRecv();
//            }, array($connection), false);
//        }
//        DebugModel::addInfo(0,'message',$resData);
        //找到workerman的生成的ID,就绑定用户的userID,因为只要下线了ID会变,所以在下线的时候要做下线处理
        $id = $connection->id;
        $type = $resData['type'];
        try{
            switch ($type) {
                case 'pong':
                    //心跳应答
                    break;
                case 'bindUser':
                    //处理数据表中对用的用户userID的workerman的ID
                    break;
                case 'init':
                    //绑定用户到$client_id
                    $client_id = $connection->id;
                    $connection->uid = $resData['uid'];
                    $connection->connection_uids[$resData['uid']] = $connection;
                    //把uid加入数组中,设置全局(global)引用
                    //把websocket的消息发给tcp设备
                    foreach ($inner_worker->connections as $tcp_con) {
                        //发给tcp绑定的某一个用户
                        $uid = $resData['uid'];
                        if (isset($connection->connection_uids[$uid])) {
                            $tcp_con->send('300');
                        }
                    }
                    break;
                case 'login':
                    LoginService::Login($resData,$connection);
                    break;
                case 'send_message':
                    if ($resData['to'] == 'all') {
                        echo "向全部用户发送消息\n";
                        self::broadcast($resData['message'],$connection);
                    } else {
                        echo "向用户{$resData['to']}发送消息\n";
                        self::sendMessageByUid($resData['to'], $resData['message'],$connection);
                    }
                    break;
                default :
                    // --------注意不能在最开始就发送---不然就会报错--------------
                    //$connection->send($db);
                    //测试发送数据,(表这些按照你自己的实际情况填写)前端可以收到
                    //$data = Db::table('tp_admin')->select();
                    //$connection->send(json_encode($data));
                    break;
            }
        }catch (\Exception $e){
            echo "出现了错误：".$e->getMessage()."\n";
        }

//        $connection->send($data);
    }
    //向所有用户发送消息
    protected static function broadcast($message,$connection)
    {
        foreach ($connection as $conn) {
            $conn->send($message);
        }
    }

    //向某个用户发送消息
    protected static function sendMessageByUid($uid, $message,$connection)
    {
        if (isset($connection->connection_uids[$uid])) {
            $conn = $connection->connection_uids[$uid];
            $conn->send($message);
        }
    }

    public function onClose($connection)
    {
        // 客户端关闭时，连接数-1
        global $connection_count;
        $connection_count--;
        global $id;
        $id--;
        //如果当前的id下线了 就操作用户的数据表
        //db('shop_user')->where([['status','=',1]])->update(['worker_id'=>0]);
        echo "connection closed\n";
    }

    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }


    /**
     * @param $worker 这个主监听的worker,不是异步监听或者内部监听
     * @throws \Exception
     * 异步访问外部websocket服务，并设置以哪个本地ip及端口访问
     */
    protected function websocketAsyncTcpConnection($worker)
    {
        global $AsyncTcpWorker;
        // 设置访问对方主机的本地ip及端口(每个socket连接都会占用一个本地端口)
        $context_option = array(
            'socket' => array(
                // ip必须是本机网卡ip，并且能访问对方主机，否则无效
                'bindto' => '127.0.0.1:2346',
            ),
            // ssl选项，参考http://php.net/manual/zh/context.ssl.php
            //'ssl' => array(
            // 本地证书路径。 必须是 PEM 格式，并且包含本地的证书及私钥。
            //'local_cert'        => '/your/path/to/pemfile',
            // local_cert 文件的密码。
            //'passphrase'        => 'your_pem_passphrase',
            // 是否允许自签名证书。
            //'allow_self_signed' => true,
            // 是否需要验证 SSL 证书。
            //'verify_peer'       => false
            //)
        );
        // 发起异步连接
        $AsyncTcpWorker = new AsyncTcpConnection('ws://127.0.0.1:2345', $context_option);
        // 设置以ssl加密方式访问
        //$con->transport = 'ssl';
        $AsyncTcpWorker->onConnect = function ($con) {
            echo 'AsyncTcpConnection start....' . "\r\n";
        };
        $AsyncTcpWorker->onClose = function ($con) {
            // 如果连接断开，则在1秒后重连
            $con->reConnect(1);
        };
        $AsyncTcpWorker->onMessage = function ($con, $data) use ($worker) {
            foreach ($worker->connections as $connection) {
                //$connection->send('110110');
            }
            //echo $data;
        };
        $AsyncTcpWorker->connect();
    }


    /**
     * 异步访问外部tcp服务
     */
    protected function tcpAsyncTcpConnection($worker)
    {
        // 发起异步连接
        $tcp_worker = new AsyncTcpConnection('tcp://127.0.0.1:2347');
        // 设置以ssl加密方式访问
        //$con->transport = 'ssl';
        $tcp_worker->onConnect = function ($con) {
            echo 'AsyncTcpConnection start....' . "\r\n";
        };
        $tcp_worker->onClose = function ($con) {
            // 如果连接断开，则在1秒后重连
            $con->reConnect(1);
        };
        $tcp_worker->onMessage = function ($con, $data) use ($worker) {
            foreach ($worker->connections as $connection) {
                //$connection->send('110110');
            }
            //echo $data;
        };
        $tcp_worker->connect();
    }


    /**
     * @param $connection
     * 内部流量控制
     * 将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用
     * onMessage方法里面调用
     */
    protected function InternalFlowControl($connection)
    {
        // 建立本地80端口的异步连接
        $connection_to_80 = new AsyncTcpConnection('tcp://127.0.0.1:80');
        // 设置将当前客户端连接的数据导向80端口的连接
        $connection->pipe($connection_to_80);
        // 设置80端口连接返回的数据导向客户端连接
        $connection_to_80->pipe($connection);
        // 执行异步连接
        $connection_to_80->connect();
    }

    /**
     * @param $connection
     * 使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用，对于上传流量控制非常有用
     * 这个方法在onMessage方法里面调用
     */
    protected function pauseRecvConnection($connection, $limit)
    {
        // 每个连接接收100个请求后就不再接收数据
        if (++$connection->messageCount > $limit) {
            //使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用
            $connection->pauseRecv();
            // 30秒后恢复接收数据
            Timer::add(30, function ($connection) {
                //使当前连接继续接收数据
                $connection->resumeRecv();
            }, array($connection), false);
        }
    }

    /**
     * @param $worker
     * @param string $event_name
     * 一般结合在onWorkerStart里面引用初始化操作
     * 订阅主题和相关的操作 返回给客户端
     */
    protected function ChannelGroupServer($worker, $event_name_title = '广播')
    {
        // Channel客户端连接到Channel服务端
        Client::connect('127.0.0.1', 2206);
        // 以自己的进程id为事件名称
        $event_name = $worker->id;
        // 订阅worker->id事件并注册事件处理函数
        Client::on($event_name, function ($event_data) use ($worker) {
            $to_connection_id = $event_data['to_connection_id'];
            $message = $event_data['content'];
            if (!isset($worker->connections[$to_connection_id])) {
                echo "connection not exists\n";
                return;
            }
            $to_connection = $worker->connections[$to_connection_id];
            $to_connection->send($message);
        });
        // 收到广播事件后向当前进程内所有客户端连接发送广播数据
        Client::on($event_name_title, function ($event_data) use ($worker) {
            $message = $event_data['content'];
            foreach ($worker->connections as $connection) {
                $connection->send($message);
            }
        });

    }


    /**
     * @param $data 接受客户端发来的内容
     * 一般结合在onMessage里面引用初始化操作
     * 发送主题相关内容到客户端
     */
    protected function ChannelGroupClient($data, $event_name_title)
    {
        //接收的data必须格式化之后才能用
        $data_connet = json_decode($data);
        if (empty($data_connet['content'])) return;
        // 是向某个worker进程中某个连接推送数据
        if (isset($data_connet['to_worker_id']) && isset($data_connet['to_connection_id'])) {
            $event_name = $data_connet['to_worker_id'];
            $to_connection_id = $data_connet['to_connection_id'];
            $content = $data_connet['content'];
            Client::publish($event_name, array(
                'to_connection_id' => $to_connection_id,
                'content' => $content
            ));
        } else {
            // 是全局广播数据
            $content = $data_connet['content'];
            Client::publish($event_name_title, array(
                'content' => $content
            ));
        }
    }
}